package org.apache.flink.cdc.connectors.gaussdb.source;

import com.huawei.gaussdb.jdbc.PGProperty;
import com.huawei.gaussdb.jdbc.jdbc.PgConnection;
import com.huawei.gaussdb.jdbc.replication.PGReplicationStream;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

import java.nio.ByteBuffer;
import java.sql.*;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

public class GaussDBSourceFunction<T> extends RichSourceFunction<String> {

    PgConnection conn;
    Properties properties;
    String url;
    ResultSet resultSet;
    String slotName;
    int replication_Mode;

    public GaussDBSourceFunction(String url, Properties properties) {
        this.url = url;
        this.properties = properties;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);

        // gaussDB的驱动
        String driver = "com.huawei.gaussdb.jdbc.Driver";
        try {
            Class.forName(driver);
        } catch (Exception e) {
            e.printStackTrace();
            return;
        }

        //对于逻辑复制，以下三个属性是必须配置项
        PGProperty.ASSUME_MIN_SERVER_VERSION.set(properties, "9.4");
        PGProperty.REPLICATION.set(properties, "database");
        PGProperty.PREFER_QUERY_MODE.set(properties, "simple");

        // 连接gaussdb数据库,ha 连接
        conn = (PgConnection) DriverManager.getConnection(url, properties);

        // 连接gaussdb数据库，端口：8000
        Connection connection = DriverManager.getConnection(url.replace("8001","8000"), properties.getProperty("user"),properties.getProperty("password"));
        // 查询slotName
        PreparedStatement sqlSlotName = connection.prepareStatement("select slot_name from pg_catalog.pg_replication_slots where slot_name = '" + properties.getProperty("replication_slot") + "'");
        resultSet = sqlSlotName.executeQuery();
    }

    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        // 查询slot是否存在
        while (resultSet.next()) {
            slotName = resultSet.getString(1);
        }

        if (slotName != null) {
            // 先删除后创建slotName
            dropReplicationSlotName(conn,properties);

            // 创建复制槽
            createReplicationSlotName(conn,properties);
            // 走stream
            replication_Mode=2;
        }else {
            // 创建复制槽
            createReplicationSlotName(conn,properties);
            replication_Mode=2;
        }

        // 输出复制槽
        System.out.println("slotName:" + properties.getProperty("replication_slot"));

        //默认逻辑复制槽的名称是：replication_slot
        //开启不同的测试模式
//        int replication_Mode = Integer.valueOf(properties.getProperty("replication_Mode"));

        // 创建逻辑复制槽
        if (replication_Mode == 1) {
//            conn.getReplicationAPI()
//                    .createReplicationSlot()
//                    .logical()
//                    .withSlotName(properties.getProperty("replication_slot")) //这里字符串如包含大写字母则会自动转化为小写字母
//                    .withOutputPlugin("mppdb_decoding") // 解码插件，固定写法
//                    .make();
            createReplicationSlotName(conn,properties);

        } else if (replication_Mode == 2) {
//            LogSequenceNumber waitLSN = LogSequenceNumber.valueOf("8/FE0828B8");
            //开启此模式前需要创建复制槽
            PGReplicationStream stream = conn
                    .getReplicationAPI()
                    .replicationStream()
                    .logical()
                    .withSlotName(properties.getProperty("replication_slot"))    // 复制槽
                    .withSlotOption("skip-empty-xacts", true)   // 解码时是否忽略空事务信息
//                    .withStartPosition(waitLSN)
                    .withSlotOption("parallel-decode-num", Integer.valueOf(properties.getProperty("parallel-decode-num", "2"))) //解码线程并行度
                    .withSlotOption("white-table-list", properties.getProperty("white-table-list")) //白名单列表
                    .withSlotOption("decode-style", properties.getProperty("decode-style", "j")) //解码格式 (j 是json ,t 是text)
                    .withSlotOption("sending-batch", Integer.valueOf(properties.getProperty("sending-batch", "0"))) //批量发送解码结果
                    .start();

            while (true) {

                ByteBuffer byteBuffer = stream.readPending();

                if (byteBuffer == null) {
                    TimeUnit.MILLISECONDS.sleep(10L);
                    continue;
                }

                int offset = byteBuffer.arrayOffset();
                byte[] source = byteBuffer.array();
                int length = source.length - offset;

                String str = new String(source, offset, length);
                // 剔除begin 和 commit的输出打印
                if (str.length() > 40) {
                    ctx.collect(str);
                }
            }
        } else if (replication_Mode == 3) {
//            conn.getReplicationAPI()
//                    .dropReplicationSlot(properties.getProperty("replication_slot"));
            dropReplicationSlotName(conn,properties);
        }

    }

    @Override
    public void cancel() {
        try {
            conn.close();
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }
    }

    public static void createReplicationSlotName(PgConnection conn, Properties properties) throws SQLException {
        conn.getReplicationAPI()
                .createReplicationSlot()
                .logical()
                .withSlotName(properties.getProperty("replication_slot")) //这里字符串如包含大写字母则会自动转化为小写字母
                .withOutputPlugin("mppdb_decoding") // 解码插件，固定写法
                .make();
    }

    // 开启逻辑复制流
    public static void startreplicationStream(PgConnection conn, Properties properties) throws SQLException {
        //开启此模式前需要创建复制槽
        PGReplicationStream stream = conn
                .getReplicationAPI()
                .replicationStream()
                .logical()
                .withSlotName(properties.getProperty("replication_slot"))    // 复制槽
                .withSlotOption("skip-empty-xacts", true)   // 解码时是否忽略空事务信息
//                    .withStartPosition(waitLSN)
                .withSlotOption("parallel-decode-num", Integer.valueOf(properties.getProperty("parallel-decode-num", "2"))) //解码线程并行度
                .withSlotOption("white-table-list", properties.getProperty("white-table-list")) //白名单列表
                .withSlotOption("decode-style", properties.getProperty("decode-style", "j")) //解码格式 (j 是json ,t 是text)
                .withSlotOption("sending-batch", Integer.valueOf(properties.getProperty("sending-batch", "0"))) //批量发送解码结果
                .start();
    }

    // 删除slotName
    public static void dropReplicationSlotName(PgConnection conn, Properties properties) throws SQLException {
        conn.getReplicationAPI()
                .dropReplicationSlot(properties.getProperty("replication_slot"));
    }
}



